1 module collie.codec.lengthbaseframe; 2 3 import std.bitmanip; 4 import std.conv; 5 import kiss.logger; 6 import kiss.event; 7 8 import collie.channel; 9 import collie.codec.exception; 10 import kiss.container.ByteBuffer; 11 12 /// The Pack format 13 /// header: ubytes 4 "00 00 00 00" -> uint 14 /// Compress Type: ubyte one "00" 15 /// the data is a data. 16 17 class MsgLengthTooBig : CollieCodecException 18 { 19 pure nothrow @nogc @safe this(string msg, string file = __FILE__, size_t line = __LINE__) 20 { 21 super(msg, file, line); 22 } 23 } 24 25 class LengthBasedFrame(bool littleEndian = false) : Handler!(const(ubyte[]),ubyte[],ubyte[],StreamWriteBuffer) 26 { 27 this(uint max, ubyte compressType = 0x00) 28 { 29 _max = max; 30 _compressType = compressType; 31 // clear(); 32 } 33 34 final override void read(Context ctx, const(ubyte[]) msg) 35 { 36 37 void doFireRead() 38 { 39 if(_data.length > 0) 40 _data = unCompress(_readComType,_data); 41 ctx.fireRead(_data); 42 _data = null; 43 _pos = ReadPOS.Length_Begin; 44 } 45 46 size_t len = msg.length; 47 for(size_t i = 0; i < len; ++i) 48 { 49 const ubyte ch = msg[i]; 50 final switch(_pos) 51 { 52 case ReadPOS.Length_Begin: 53 _lenByte[0] = ch; 54 _pos = ReadPOS.Length_1; 55 break; 56 case ReadPOS.Length_1: 57 _lenByte[1] = ch; 58 _pos = ReadPOS.Length_2; 59 break; 60 case ReadPOS.Length_2: 61 _lenByte[2] = ch; 62 _pos = ReadPOS.Length_End; 63 break; 64 case ReadPOS.Length_End: 65 _lenByte[3] = ch; 66 _pos = ReadPOS.Compress_Type; 67 break; 68 case ReadPOS.Compress_Type: 69 _readComType = ch; 70 _pos = ReadPOS.Body; 71 _readLen = 0; 72 _msgLen = endianToNative!(littleEndian,uint)(_lenByte); 73 if(_msgLen == 0) { 74 doFireRead(); 75 continue; 76 } else if(_msgLen > _max){ 77 throw new MsgLengthTooBig("the max is : " ~ to!string(_max) ~ " the length is :" ~ to!string(_msgLen)); 78 } 79 _data = new ubyte[_msgLen]; 80 break; 81 case ReadPOS.Body: 82 { 83 const size_t needLen = _msgLen - _readLen; 84 const size_t canRead = len - i; 85 logDebug(); 86 if(canRead >= needLen){ 87 auto tlen = i + needLen; 88 _data[_readLen.._msgLen] = msg[i..tlen]; 89 i = tlen - 1; 90 doFireRead(); 91 } else { 92 auto tlen = _readLen + canRead; 93 _data[_readLen..tlen] = msg[i..len]; 94 _readLen = cast(uint)tlen; 95 return; 96 } 97 } 98 break; 99 } 100 } 101 } 102 103 final override void write(Context ctx, ubyte[] msg, TheCallBack cback = null) 104 { 105 logDebug("writeln data!"); 106 try 107 { 108 ubyte ctype = _compressType; 109 auto tmsg = doCompress(ctype, msg); 110 uint size = cast(uint) tmsg.length; 111 if(size > _max){ 112 throw new MsgLengthTooBig("the max is : " ~ to!string(_max) ~ " the length is :" ~ to!string(_msgLen)); 113 } 114 ubyte[] data = new ubyte[size + 5]; 115 ubyte[4] length = nativeToEndian!(littleEndian,uint)(size); 116 data[0 .. 4] = length[]; 117 data[4] = ctype; 118 data[5 .. $] = tmsg[]; 119 ctx.fireWrite(new SocketStreamBuffer(data,null),null); 120 if (cback) 121 cback(msg, size); 122 } 123 catch (Exception e) 124 { 125 import collie.utils.exception; 126 showException(e); 127 if (cback) 128 cback(msg, 0); 129 } 130 } 131 132 protected: 133 ubyte[] doCompress(ref ubyte type, ubyte[] data) 134 { 135 return data; 136 } 137 138 ubyte[] unCompress(in ubyte type, ubyte[] data) 139 { 140 return data; 141 } 142 143 private: 144 enum ReadPOS : ubyte 145 { 146 Length_Begin, 147 Length_1, 148 Length_2, 149 Length_End, 150 Compress_Type, 151 Body 152 } 153 154 private: 155 ubyte[] _data; 156 ubyte[4] _lenByte; 157 ubyte _readComType; 158 uint _msgLen; 159 uint _readLen; 160 ReadPOS _pos = ReadPOS.Length_Begin; 161 162 uint _max; 163 ubyte _compressType; 164 } 165 166 167 unittest 168 { 169 import collie.net; 170 import kiss.net.TcpStream; 171 import collie.channel.handlercontext; 172 import std.stdio; 173 174 ubyte[] gloaData; 175 176 class Contex : HandlerContext!(ubyte[],StreamWriteBuffer) 177 { 178 override void fireRead(ubyte[] msg) 179 { 180 writeln("the msg is : ", cast(string) msg); 181 } 182 183 override void fireTimeOut() 184 { 185 } 186 187 override void fireTransportActive() 188 { 189 } 190 191 override void fireTransportInactive() 192 { 193 } 194 195 override void fireWrite(StreamWriteBuffer msg, void delegate(StreamWriteBuffer, size_t) cback = null) 196 { 197 auto data = msg.sendData; 198 gloaData ~= data; 199 writeln("length is : ", data[0 .. 4], " \n the data is : ", cast(string)(data[4 .. $])); 200 } 201 202 override void fireClose() 203 { 204 } 205 206 override @property PipelineBase pipeline() 207 { 208 return null; 209 } 210 211 override @property Transport transport() 212 { 213 return null; 214 } 215 } 216 217 Contex ctx = new Contex(); 218 219 auto hander = new LengthBasedFrame!false(2048); 220 string data = "i am a test string"; 221 ubyte[] tdata = cast(ubyte[]) data; 222 hander.write(ctx, tdata); 223 224 hander.write(ctx, gloaData); 225 226 hander.read(ctx, gloaData); 227 228 hander.read(ctx, gloaData[0 .. 3]); 229 hander.read(ctx, gloaData[3 .. 20]); 230 hander.read(ctx, gloaData[20 .. $]); 231 232 }